定期的にDynamoDBのデータをS3バケットにCSV形式で保存するLambdaを作成してみた
はじめに
Amazon DynamoDBのデータを毎日Amazon S3バケットへCSV形式で保存するAWS Lambdaを作成する機会がありましたので、紹介します。
構成としては以下のとおりです。
- Amazon EventBridge Schedulerで毎日01:00にLambdaを呼出します
- Lambdaは、DynamoDBから前日に保存されたデータを取得します。
- LambdaでCSV形式に変換し、S3バケットに保存します。
DynamoDBのテーブルは、属性としてデータを保存した日付が入っていれば、テーブルは何でもよいですが、今回は以下の記事で作成したテーブルを利用します。
本記事で使用するDynamoDBのテーブルには、start_date
(保存した日付)の属性が必要です。存在しない場合は、新たに追加してください。
- 保存した日付:
start_date
(必須)- 例:2024-04-26
- 保存した日時:
start_time
- 例:2024-04-26T09:40:18+0900
本記事では1日ごとにCSV形式で保存していますが、その場合、GSIのソートキー(保存した日時start_time
)は不要です。
ただし、1時間など時間単位でCSV形式で保存する場合、ソートキー(時間が記載された属性)も必要になりますので、今回は使わないですがソートキーを設定しています。
DynamoDB グローバルセカンダリインデックス作成
以下のようにグローバルセカンダリインデックス(GSI)を作成します。
- パーティションキー:
start_date
- ソートキー:
start_time
- インデックス名:start_date-start_time-index
Lambda作成
作成するLambdaの設定は以下の通りです。
- ランタイム:Python3.12
- タイムアウト:10秒
- 追加するIAMポリシー
- AmazonDynamoDBReadOnlyAccess
- AmazonS3FullAccess
- 環境変数
DYNAMODB_TABLE_NAME
(DynamoDBテーブル名)S3_BUCKET_NAME
(S3バケット名)DYNAMODB_GSI_NAME
(DynamoDBテーブルのGSI名)
import boto3 import csv import json import os from datetime import datetime, timedelta, timezone from io import StringIO from boto3.dynamodb.conditions import Key TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME'] BUCKET_NAME = os.environ['S3_BUCKET_NAME'] GSI_NAME = os.environ['DYNAMODB_GSI_NAME'] START_DATE = 'start_date' START_TIME = 'start_time' def get_dynamodb_data(yesterday_date): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(TABLE_NAME) start_date = yesterday_date.strftime('%Y-%m-%dT00:00:00+0900') end_date = (yesterday_date + timedelta(days=1)).strftime('%Y-%m-%dT00:00:00+0900') formatted_date_key = yesterday_date.strftime('%Y-%m-%d') response = table.query( IndexName=GSI_NAME, KeyConditionExpression=Key(START_DATE).eq(formatted_date_key) & Key(START_TIME).between(start_date, end_date) ) return response['Items'] def create_csv(dynamo_records): csv_data = StringIO() fieldnames = [key for key in dynamo_records[0].keys() if key != START_DATE] writer = csv.DictWriter(csv_data, fieldnames=fieldnames) writer.writeheader() for record in dynamo_records: filtered_record = {key: record[key] for key in fieldnames} writer.writerow(filtered_record) return csv_data.getvalue() def upload_to_s3(csv_data, yesterday_date): s3 = boto3.client('s3') year = yesterday_date.strftime("%Y") month = yesterday_date.strftime("%m") s3_key = f'{year}/{month}/{TABLE_NAME}-{yesterday_date.strftime("%Y-%m-%d")}.csv' s3.put_object( Body=csv_data, Bucket=BUCKET_NAME, Key=s3_key ) return s3_key def get_yesterday_date(): jst = timezone(timedelta(hours=9)) today = datetime.now(jst).date() return today - timedelta(days=1) def lambda_handler(event, context): print('event:' + json.dumps(event, ensure_ascii=False)) yesterday_date = get_yesterday_date() dynamo_records = get_dynamodb_data(yesterday_date) if not dynamo_records: print("No data found for the given date.") return csv_data = create_csv(dynamo_records) s3_key = upload_to_s3(csv_data, yesterday_date) print(f'Successfully uploaded CSV to S3: s3://{BUCKET_NAME}/{s3_key}')
- 関数名
get_dynamodb_data
では、GSIを使用して前日保存されたデータを検索します。DynamoDBに前日保存されたデータが無い場合、空の配列が返ります。 - 関数名
create_csv
では、取得したDynamoDBのデータをCSV形式に変換します。必要な属性値のみをCSV形式に出力し、start_date
は不要だったため、除外しています。
EventBridge Scheduler
Lambdaを毎日呼び出すため、EventBridge Schedulerを作成します。
毎日午前1時に実行させます。
cron式は以下の通りです
0 1 * * ? *
作成したLambdaを選択します。
再試行数を調整し、他はデフォルトのまま次に進み作成します。
ちなみに、作成されたIAMロールのIAMポリシーは、作成したLambdaに対してlambda:InvokeFunction
権限の許可が設定されています。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:Lambda名:*", "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxx:function:Lambda名" ] } ] }
確認してみる
翌日、S3バケットを確認したところ、CSV形式で正しく保存されていました。
中身も問題なくCSV形式であることが確認できました。
Lambdaのログ(EventBridge Schedulerから受け取るイベント内容)は以下の通りです
{ "version": "0", "id": "a0662d21-0082-48cf-8674-a39e2c497ffa", "detail-type": "Scheduled Event", "source": "aws.scheduler", "account": "xxxxxxxxxxx", "time": "2024-04-28T16:00:00Z", "region": "ap-northeast-1", "resources": [ "arn:aws:scheduler:ap-northeast-1:xxxxxxxxxxx:schedule/default/Lambda名" ], "detail": "{}" }
time
はUTCです。
最後に
DynamoDBのデータを毎日S3バケットにCSV形式で保存するための実現方法を紹介しました。
本記事では1日分のデータ処理を紹介しましたが、DynamoDBの属性やLambdaのコード、EventBridge Schedulerを修正することで1週間や1ヶ月分のデータ処理も可能です。
CSV形式で出力後は、DynamoDBのデータはTTL(Time to Live)機能で自動削除してもよいです。
参考になれば幸いです。